In [1]:
import random
import sys
import time
from abc import ABC, abstractmethod
from collections import defaultdict
from dataclasses import dataclass
from itertools import product
from typing import Optional
import matplotlib as mpl
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
from IPython.display import Markdown
from sklearn.preprocessing import normalize
from tqdm import tqdm_notebook as tqdm
In [2]:
%matplotlib inline
mpl.rcParams['figure.figsize'] = [8.0, 3.0]
In [3]:
print(time.asctime())
In [4]:
print(sys.version)
In [5]:
# My favorite seed
np.random.seed(127)
random.seed(127)
In [6]:
def draw(graph):
edges = graph.edges()
pos = nx.spring_layout(graph)
colors = []
for (u,v,attrib_dict) in list(graph.edges.data()):
colors.append('blue' if attrib_dict['weight'] == 1 else 'red')
nx.draw(graph, pos=pos, edges=edges, edge_color=colors, node_size=60)
def assign_bernoulli_polarity(graph, p:float = 0.5) -> None:
"""Bigger probability means more positive edges."""
for u, v, k in graph.edges(keys=True):
graph.edges[u, v, k]['weight'] = 1 if random.random() < p else -1
In [7]:
# Insulation parameters to check
alphas = (
0.1,
0.01,
0.001,
)
n_subplots_x = 3
n_subplots_y = int((1 + len(alphas)) / n_subplots_x)
n_subplots_x, n_subplots_y
Out[7]:
Definitions:
Directed graph $G$ is a defined as:
$G = (V, E)$
Where edges $E$ are a subset of pairs of verticies $V$:
$E \subseteq V \times V$
Edges $(V_i, V_j) \in E$ are weighted according to weighting function $w$
$w: V \times V \to \{-1, 0, 1\}$
where edges with positive polarity have weight $w(V_i, V_j) = 1$, negative polarity have weight of $w(V_i, V_j) = -1$, and missing from the graph have $w(V_i, V_j) = 0$. More succinctly, the weights can be represented with weight matrix $W$ defined as
$W_{i,j} = w(V_i, V_j)$
Nodes have initial heats represented as vector $h^0 \in \mathbb{R}^{|V|}$
Heat flows through the out-edges of $V_i$ divided evenly among its neighbors. This first means that $W$ must be row-wise normalized (the "L1-norm§ion=4#Absolute-value_norm)"). It can be redefined as:
$W_{i,j} = \frac{w(V_i, V_j)}{\sum_{k=0}^{|V|} w(V_i, V_k)}$
Luckily, sklearn.preprocessing.normalize does the trick.
However, only percentage, $\alpha$, of the heat on a given node is allowed to flow at any given step. The remaining percentage of the heat ($1 - \alpha$) stays.
Heat flows through the out-edges of $V_i$ divided evenly among its neighbors.
$\delta_{in}^t(i) = \sum_{j=1}^{|V|} h_j^t W_{j, i} = h^t W_{., i}$
$\delta_{out}^t(i) = \sum_{j=1}^{|V|} h_i^t W_{i, j}$
$\delta^t(i) = \delta_{in}^t(i) - \delta_{out}^t(i)$
Using step size $\alpha$, the new heat at time point $t + 1$ is
$h^{t+1}_i = (1 - \alpha) h^t_i + \alpha \delta^t(i)$
Therefore
$h^{t+1} = (1 - \alpha) h^t + \alpha \delta^t$
In [8]:
class BaseDiffuser(ABC):
def __init__(self, graph: nx.DiGraph, alpha: float, steps: Optional[int] = None) -> None:
self.alpha = alpha
self.deltas = []
self.heats = []
self.steps = steps or int(30 / self.alpha)
self.weights = self.calculate_weights(graph)
@staticmethod
@abstractmethod
def calculate_weights(graph):
raise NotImplementedError
@abstractmethod
def run(self, heat, tqdm_kwargs=None) -> None:
raise NotImplementedError
def _plot_diffusion_title(self):
return f'Diffusion ($\\alpha={self.alpha}$)'
def plot(self, heat_plt_kwargs=None, deriv_plt_kwargs=None) -> None:
fig, (lax, rax) = plt.subplots(1, 2)
lax.set_title(self._plot_diffusion_title())
lax.set_ylabel('Heat')
lax.set_xlabel('Time')
pd.DataFrame(self.heats).plot.line(ax=lax, logx=True, **(heat_plt_kwargs or {}))
rax.set_title('Derivative of Sum of Absolute Heats')
rax.set_ylabel('Change in Sum of Absolute Heats')
rax.set_xlabel('Time')
derivative = [
(x2 - x1)
for x1, x2 in zip(self.deltas, self.deltas[1:])
]
pd.DataFrame(derivative).plot.line(ax=rax, logx=True, legend=False, **(deriv_plt_kwargs or {}))
plt.tight_layout(rect=[0, 0, 1, 0.95])
return fig, (lax, rax)
@staticmethod
def optimize_alpha_multirun(graph, alphas, heat):
alpha_heats = {}
alpha_deltas = {}
for alpha in alphas:
diffuser = Diffuser(graph, alpha)
diffuser.run(heat)
alpha_heats[alpha] = diffuser.heats
alpha_deltas[alpha] = diffuser.deltas
return alpha_deltas, alpha_heats
@classmethod
def optimize_alpha_multiplot(cls, graph, alphas, heat, heat_plt_kwargs=None, deriv_plt_kwargs=None):
ds, hs = cls.optimize_alpha_multirun(graph, alphas, heat)
cls._optimize_alpha_multiplot_helper(hs, plt_kwargs=heat_plt_kwargs)
cls._optimize_alpha_multiplot_deriv_helper(ds, plt_kwargs=deriv_plt_kwargs)
@staticmethod
def _optimize_alpha_multiplot_helper(hs, plt_kwargs=None):
fig, axes = plt.subplots(n_subplots_y, n_subplots_x)
for alpha, ax in zip(alphas, axes.ravel()):
ax.set_title(f'$\\alpha={alpha}$')
ax.set_ylabel('Heat')
ax.set_xlabel('Time')
pd.DataFrame(hs[alpha]).plot.line(ax=ax, logx=True, **(plt_kwargs or {}))
plt.suptitle(f'Diffusion ($\\alpha={alpha}$)')
plt.tight_layout(rect=[0, 0, 1, 0.95])
@staticmethod
def _optimize_alpha_multiplot_deriv_helper(ds, plt_kwargs=None):
fig, axes = plt.subplots(n_subplots_y, n_subplots_x)
for alpha, ax in zip(ds, axes.ravel()):
ax.set_title(f'$\\alpha={alpha}$')
ax.set_ylabel('Change in Sum of Heats')
ax.set_xlabel('Time')
derivative = [
(x2 - x1)
for x1, x2 in zip(ds[alpha], ds[alpha][1:])
]
pd.DataFrame(derivative).plot.line(ax=ax, logx=True, legend=False, **(plt_kwargs or {}))
plt.suptitle('Derivative of Sum of Absolute Heats')
plt.tight_layout(rect=[0, 0, 1, 0.95])
@classmethod
def multiplot(cls, graphs_and_heats, alpha):
for graph, init_h in graphs_and_heats:
d = cls(graph, alpha=alpha)
d.run(init_h)
fig, axes = d.plot(heat_plt_kwargs=dict(legend=False))
fig.suptitle(graph.name)
plt.show()
class InsulatedDiffuser(BaseDiffuser):
def run(self, heat, tqdm_kwargs=None) -> None:
for _ in tqdm(range(self.steps), leave=False, desc=f'alpha: {self.alpha}'):
delta = heat @ self.weights
self.deltas.append(np.sum(np.abs(delta)))
heat = (1 - self.alpha) * heat + self.alpha * delta
self.heats.append(heat)
class Diffuser(InsulatedDiffuser):
@staticmethod
def calculate_weights(graph):
adj = nx.to_numpy_array(graph)
return normalize(adj, norm='l1')
In [9]:
example_1_graph = nx.DiGraph()
example_1_graph.name = 'Example 1 - Small Decreasing Graph'
example_1_graph.add_edges_from([
('A', 'B', dict(weight=-1)),
('A', 'C', dict(weight=-1)),
('B', 'C', dict(weight=+1)),
])
plt.figure(figsize=(3, 3))
draw(example_1_graph)
plt.title(f'Visualization of ${example_1_graph}$')
plt.show()
In [10]:
example_1_init_h = np.array([5.0, 2.0, 2.0])
Diffuser.optimize_alpha_multiplot(example_1_graph, alphas, example_1_init_h)
In [11]:
example_2_graph = nx.scale_free_graph(n=20, alpha=.31, beta=.64, gamma=.05)
example_2_graph.name = 'Example 2 - Random Graph with Even Polarity'
assign_bernoulli_polarity(example_2_graph, p=0.5)
draw(example_2_graph)
In [12]:
example_2_init_h = np.random.normal(size=example_2_graph.number_of_nodes())
Diffuser.optimize_alpha_multiplot(example_2_graph, alphas, example_2_init_h, heat_plt_kwargs=dict(legend=False))
In [13]:
example_3_graph = nx.scale_free_graph(n=20, alpha=.31, beta=.64, gamma=.05)
example_3_graph.name = 'Example 3 - Random Graph with Mostly Negative Polarity'
assign_bernoulli_polarity(example_3_graph, p=0.3)
example_3_init_h = np.random.normal(size=example_3_graph.number_of_nodes())
diffuser = Diffuser(example_3_graph, alpha=0.01)
diffuser.run(example_3_init_h)
diffuser.plot(heat_plt_kwargs=dict(legend=False))
plt.show()
In [14]:
example_4_graph = nx.scale_free_graph(n=20, alpha=.31, beta=.64, gamma=.05)
example_4_graph.name = 'Example 4 - Random Graph with Mostly Positive Polarity'
assign_bernoulli_polarity(example_4_graph, p=0.7)
example_4_init_h = np.random.normal(size=example_4_graph.number_of_nodes())
diffuser = Diffuser(example_4_graph, alpha=0.01)
diffuser.run(example_4_init_h)
diffuser.plot(heat_plt_kwargs=dict(legend=False))
plt.show()
In [15]:
example_5_graph = nx.DiGraph()
example_5_graph.name = 'Example 5 - Small Increasing Graph'
example_5_graph.add_edges_from([
(0, 1, dict(weight=+1)),
(0, 2, dict(weight=+1)),
(1, 2, dict(weight=+1)),
])
plt.figure(figsize=(3, 3))
draw(example_5_graph)
plt.title(f'Visualization of ${example_5_graph}$')
plt.show()
In [16]:
example_5_init_h = np.random.normal(size=example_5_graph.number_of_nodes())
diffuser = Diffuser(example_5_graph, alpha=0.01)
diffuser.run(example_5_init_h)
diffuser.plot()
plt.show()
In [17]:
example_6_graph = nx.DiGraph()
example_6_graph.name = 'Example 6 - Small Chaotic Increasing Graph'
example_6_graph.add_edges_from([
(0, 1, dict(weight=+1)),
(1, 2, dict(weight=+1)),
(2, 0, dict(weight=+1)),
])
plt.figure(figsize=(3, 3))
draw(example_6_graph)
plt.title(f'Visualization of ${example_6_graph}$')
plt.show()
In [18]:
example_6_init_h = np.random.normal(size=example_6_graph.number_of_nodes())
diffuser = Diffuser(example_6_graph, alpha=0.01)
diffuser.run(example_6_init_h)
diffuser.plot()
plt.show()
This is the first example of a system coming to a non-zero steady state! One of the reasons is any system that has a sink will always hemmorage heat out of the sink.
Some ideas on how to deal with this:
In [19]:
example_graphs = [
(example_1_graph, example_1_init_h),
(example_2_graph, example_2_init_h),
(example_3_graph, example_3_init_h),
(example_4_graph, example_4_init_h),
(example_5_graph, example_5_init_h),
(example_6_graph, example_6_init_h),
]
In [20]:
class SelfConnectedInsulatedDiffuser(InsulatedDiffuser):
""""""
def _plot_diffusion_title(self):
return f'Self-Connected Insulated Diffusion ($\\alpha={self.alpha}$)'
@staticmethod
def calculate_weights(graph):
adj = nx.to_numpy_array(graph)
for i in range(adj.shape[0]):
adj[i, i] = 1.0
return normalize(adj, norm='l1')
SelfConnectedInsulatedDiffuser.multiplot(example_graphs, alpha=0.01)
In [21]:
class AntiSelfConnectedInsulatedDiffuser(InsulatedDiffuser):
""""""
def _plot_diffusion_title(self):
return f'Self-Connected Insulated Diffusion ($\\alpha={self.alpha}$)'
@staticmethod
def calculate_weights(graph):
adj = nx.to_numpy_array(graph)
for i in range(adj.shape[0]):
adj[i, i] = -1.0
return normalize(adj, norm='l1')
AntiSelfConnectedInsulatedDiffuser.multiplot(example_graphs, alpha=0.01)